<?php

/**
 * Kafka　Demo
 * "nmred/kafka-php": "0.1.5",
 *
 * @author ayg
 */

namespace app\index\controller;

class Kafka
{

    //生产
    function produce()
    {
        $produce = \Kafka\Produce::getInstance('192.168.1.94:9092', 3000, '192.168.1.94:9092');
        $produce->setRequireAck(-1);
        $produce->setMessages('test', 0, array(date('Y-m-d H:i:s')));
        $produce->setMessages('test', 1, array(date('Y-m-d H:i:s')));
        $result = $produce->send();
        dump($result);
    }

    //消费
    function consumer()
    {
        $consumer = \Kafka\Consumer::getInstance('192.168.1.94:9092', 3000, '192.168.1.94:9092');
        $consumer->setGroup('testgroup');
        $consumer->setPartition('test', 0);
        $consumer->setPartition('test', 1);
        $result = $consumer->fetch();
        foreach ($result as $topicName => $partition) {
            foreach ($partition as $partId => $messageSet) {
                foreach ($messageSet as $k => $message) {
                    dump($message->getMessage());
                    flush();
                }
            }
        }
    }

}
